package com.dahaonetwork.smartfactory.websocket.base;

import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.serializer.SerializerFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.dahaonetwork.smartfactory.websocket.model.Message;
import com.dahaonetwork.smartfactory.websocket.model.MessageType;
import com.dahaonetwork.smartfactory.websocket.util.WebSocketDispatcher;

/**
 * @ServerEndpoint 注解是一个类层次的注解，它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 * @author liguobao
 *
 */
@ServerEndpoint("/{topicId}/{identityId}")
@Component
public class WebSocket {
	
	protected final Logger log = LoggerFactory.getLogger(this.getClass());  

    //静态变量，用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;
    /** 与某个客户端的连接会话，需要通过它来给客户端发送数据 */
    private Session session;
    /** 主题 */
	private String topicId;
	/** 通信唯一标识 */
	private String identityId;
	
	  /**
     * 连接建立成功调用的方法
     * @param session  可选的参数。session为与某个客户端的连接会话，需要通过它来给客户端发送数据
     */
	@OnOpen
    public void onOpen(@PathParam("topicId") String topicId, 
    		@PathParam("identityId") String identityId, Session session){
    	//填充属性
    	this.session = session;
    	this.topicId = topicId;
    	this.identityId = identityId;
    	WebSocketCollection.add(this);
    	addOnlineCount();           //在线数加1
    	log.info("有新连接加入！当前在线人数为" + getOnlineCount());
    }
	
	 /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session, CloseReason resson){
    	WebSocketCollection.remove(this);
        subOnlineCount();           //在线数减1
        log.info("有一连接关闭！当前在线人数为" + getOnlineCount());
    }
    
    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息,并实现将此消息转发给同范围的客户
     * @param session 可选的参数
     * @throws IOException 
     */
    @OnMessage
    public void onMessage(String message, Session session) {
    	//同样先反序列化为我们的消息体，看是否需要转发，如果不能成为我们的bean，当做普通消息，保存到class属性中，不做其他处理
    	System.out.println("来自客户端的消息:" + message);
    	//得到message对象
    	Message msg =JSON.parseObject(message, Message.class);
    	try {
			WebSocketDispatcher.sendMessage(msg, null);
		} catch (IOException e) {
			log.error(e.getMessage(), e);
			log.warn("来自客户端的消息发送失败", e);
		}
    }
    
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        log.warn("webSocket错误", error);
    }

    /**
     * 这个方法与上面几个方法不一样。没有用注解，是根据自己需要添加的方法。
     * @param message
     * @throws IOException
     */
    public void sendMessage(Message message) throws IOException{
    	//需要顺带将本socket信息传过去
    	if(this.session.isOpen())
    	{
    		String messageContent = "";
    		MessageType type = getType(message.getType());
    		switch(type){
    		case content:
    			messageContent = message.getContent();
    			break;
    		default:
    			messageContent=JSON.toJSONString(message,SerializerFeature.WriteMapNullValue);
    		}
    		this.session.getBasicRemote().sendText(messageContent);//同步的发送信息
    		//this.session.getAsyncRemote().sendText(message);//异步的发送信息
    	}
    }
    
    private MessageType getType(String type){
    	if(null != type)
    		for(MessageType mt : MessageType.values()){
    			if(mt.isMe(type))
    				return mt;
    		}
    	//没有定义则放回默认的content类型
    	return MessageType.content;
    }
    
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    
    /**
     * @Package: com.jb.components.webSocket.base<br>
     * @ClassName: WebSocketCollection<br>
     * @Description: 内部类<br>
     */
    public static class WebSocketCollection {
    	//key为topicId共同组成,value为map（key为persId，value为webSocket实例的Set集合）
    	//由volatile标识，及时通知其他线程
    	public static volatile HashMap<String, HashMap<String, CopyOnWriteArraySet<WebSocket>>> webSocketMap = 
    			new HashMap<String, HashMap<String, CopyOnWriteArraySet<WebSocket>>>();
    	/**
    	 * @Title: add
    	 * @Description: 打开连接时想容器中添加登录人信息
    	 * @param socket
    	 * @return void
    	 * @throws
    	 */
    	@SuppressWarnings({ "serial" })
		static void add(final WebSocket socket){//需要定义成final是由于需要在线程安全的集合中使用，故不能修改
    		//向线程安全集合添加信息，锁定该集合(效率感觉会很低啊，注册不会向发消息那样不能容忍延迟，测试正在大量发消息时add会不会影响消息的发送:未看出明显延迟，毕竟注册只是少数)
//        	synchronized (WebSocketCollection.webSocketMap) {
        		String topicId = socket.topicId;
				final String identityId = socket.identityId;
        		//首先根据topicId获取该主题下的事件即唯一标识，若无，新增
				HashMap<String, CopyOnWriteArraySet<WebSocket>> identities = WebSocketCollection.webSocketMap.get(topicId);//read collection
            	if(identities == null){
            		//新增元素
            		WebSocketCollection.webSocketMap.put(topicId, 
            			new HashMap<String, CopyOnWriteArraySet<WebSocket>>(){
	            			{put(
	            					identityId, 
	            					new CopyOnWriteArraySet<WebSocket>(){
	            						{add(socket);}
	            					}
	            				);
	            			}
            			}
            		);//write collection
            	}else{
            		//根据identityId获取该事件下是否已经有webSocket连接，若无，新增，有，添加
            		Set<WebSocket> webSockets = identities.get(identityId);
            		if(webSockets == null){
            			identities.put(identityId, new CopyOnWriteArraySet<WebSocket>(){{add(socket);}});
            		}else{
            			//添加元素
            			webSockets.add(socket);//write collection
            		}
            		
            	}
//    		}
    	}
    	/**
    	 * @Title: remove
    	 * @Description: websocket关闭时解除该socket信息
    	 * @param socket
    	 * @return void
    	 * @throws
    	 */
    	static void remove(WebSocket socket){
    		//此步操作暂时未发现线程安全为题
    		CopyOnWriteArraySet<WebSocket> webSockets = WebSocketCollection.webSocketMap.get(socket.topicId).get(socket.identityId);//read first collection
    		//经分析，好像只有此处会涉及线程安全问题（本身是线程安全，但是推送消息方法中可能会获取该remove的socket实例，虽然移除掉在set中会及时通知，但是使用迭代器之后，不会讲迭代器内通知到）
    		webSockets.remove(socket);//write second mapCollection  thread safe!!!!!!!
    		//存在一个问题有待解决，即当某个事件下所有webSocket实例均remove，那么是否保留该identity的map？我认为应该保留，作为缓存保留，避免添加时再次判断
    	}
    }
}
